package com.atguigu

import java.net.InetSocketAddress
import java.util

import com.alibaba.fastjson.JSONObject
import com.alibaba.otter.canal.client.{CanalConnector, CanalConnectors}
import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, EventType, RowChange}
import com.alibaba.otter.canal.protocol.{CanalEntry, Message}
import com.atguigu.gmall.common.Constant
import com.google.protobuf.ByteString

import scala.collection.JavaConverters._
import scala.util.Random

/**
 * Author atguigu
 * Date 2020/10/17 15:49
 */
object CanalClient {
    
    def handleRowData(tableName: String,
                      eventType: CanalEntry.EventType,
                      rowDataList: util.List[CanalEntry.RowData]) = {
        if (!rowDataList.isEmpty && tableName == "order_info" && eventType == EventType.INSERT) {
            handleData(rowDataList, Constant.ORDER_INFO_TOPIC)
        }else if (!rowDataList.isEmpty && tableName == "order_detail" && eventType == EventType.INSERT) {
            handleData(rowDataList, Constant.ORDER_DETAIL_TOPIC)
        }
    }
    
    private def handleData(rowDataList: util.List[CanalEntry.RowData], topic: String): Unit = {
        for (rowData <- rowDataList.asScala) {
            val columns: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
            val obj = new JSONObject()
            for (column <- columns.asScala) {
                val key = column.getName
                val value: String = column.getValue
                obj.put(key, value)
            }
            // 发送到kafka
            new Thread(){
                override def run(): Unit = {
                    Thread.sleep(new Random().nextInt(10)*1000)
                    MyKafkaUtil.sendToKafka(topic, obj.toJSONString)
                }
            }.start()
        }
    }
    
    def main(args: Array[String]): Unit = {
        // 1. 连接canal服务器
        val addr = new InetSocketAddress("hadoop102", 11111)
        // 1.1 获取到canal服务器的连接器
        val conn: CanalConnector = CanalConnectors.newSingleConnector(addr, "example", "", "")
        // 1.2 连接上
        conn.connect()
        // 1.3 订阅数据
        conn.subscribe("gmall0523.*")
        // 2. 从服务器读数据
        // 2.1 拉取数据  100: 表示最多100条sql导致变化的数据
        while (true) {
            val msg: Message = conn.get(100)
            val entries: util.List[CanalEntry.Entry] = msg.getEntries
            if (entries.size() > 0) {
                // 解析
                for (entry <- entries.asScala) {
                    if (entry != null && entry.hasEntryType && entry.getEntryType == EntryType.ROWDATA) {
                        val storeValue: ByteString = entry.getStoreValue
                        val rowChange: RowChange = RowChange.parseFrom(storeValue)
                        val rowDataList: util.List[CanalEntry.RowData] = rowChange.getRowDatasList
                        handleRowData(entry.getHeader.getTableName, rowChange.getEventType, rowDataList)
                    }
                }
            } else {
                System.out.println("没有拉到数据, 3s之后继续拉....");
                Thread.sleep(3000)
            }
        }
        // 解析数据
        
        // 把解析的数据写入到kafka
    }
}
